Skip to content

[SPARK-56961][SQL] Pass all options while loading changelog#56044

Open
aokolnychyi wants to merge 3 commits into
apache:masterfrom
aokolnychyi:spark-56961
Open

[SPARK-56961][SQL] Pass all options while loading changelog#56044
aokolnychyi wants to merge 3 commits into
apache:masterfrom
aokolnychyi:spark-56961

Conversation

@aokolnychyi
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR passes all specified options while loading changelogs.

Why are the changes needed?

These changes are needed to make the API usable in connectors like Iceberg and Delta.

Does this PR introduce any user-facing change?

The functionality hasn't been released yet.

How was this patch tested?

This PR comes with tests.

Was this patch authored or co-authored using generative AI tooling?

Claude Code v2.1.145.

*
* @since 4.2.0
*/
@Evolving
public class ChangelogInfo {
Copy link
Copy Markdown
Contributor Author

@aokolnychyi aokolnychyi May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename this given that we will have to pass options to loadTable as well. Right now, we already have TableInfo that we use for CREATE table cases. We will NOT be able to use TableInfo for loading tables. So ChangelogInfo conflicts with TableInfo as it is used for loading, not creation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative name to consider is ChangelogParameters but context seems a better fit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

@aokolnychyi aokolnychyi May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My proposal would be:

loadTable(ident, tableContext (timeTravelSpec, writePrivileges), options)
 - by default delegates to existing loadTable() methods
loadChangelog(ident, changelogContext (range, deduplicationMode, etc), options)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We concluded that passing all options to load methods for tables and changelog is a requirement for external connectors like Delta and Iceberg.

@aokolnychyi
Copy link
Copy Markdown
Contributor Author

cc @huaxingao, this is one of the items we discussed on the dev list for 4.2

aokolnychyi and others added 2 commits May 21, 2026 14:55
…log/TableCatalog.java

Co-authored-by: Gengliang Wang <gengliang@apache.org>
…ysis/RelationChanges.scala

Co-authored-by: Gengliang Wang <gengliang@apache.org>
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does two things: renames ChangelogInfoChangelogContext (and the associated ChangelogInfoUtils + the changelogInfo field on RelationChanges/ChangelogTable), and adds a CaseInsensitiveStringMap options parameter to TableCatalog.loadChangelog so connectors like Iceberg/Delta receive the full raw options map.

The rename is anticipatory: per @aokolnychyi's inline thread, the planned shape is loadTable(ident, tableContext, options) and loadChangelog(ident, changelogContext, options). Using ChangelogContext avoids the conflict with TableInfo (which is for CREATE).

Options-flow design: all four frontends (AstBuilder, DataFrameReader, DataStreamReader, SparkConnectPlanner) build a single CaseInsensitiveStringMap and pass it to both the UnresolvedRelation and the ChangelogContext builder. RelationResolution.resolveChangelog then forwards u.options to loadChangelog — so options stay as a single source of truth on UnresolvedRelation rather than being duplicated on RelationChanges. Reasonable.

LGTM overall — just a few test-coverage nits and an opportunistic naming cleanup inline.


val opts = cat.lastOptions
assert(opts.isDefined)
assert(opts.get.get("customOption") == "customValue")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc on loadChangelog says options "include the CDC-recognized keys (range, deduplication mode, etc.) that are also parsed into context" — but this test only asserts the custom key is forwarded. Worth also asserting a CDC-recognized key is present so the Javadoc claim is pinned by a test:

Suggested change
assert(opts.get.get("customOption") == "customValue")
assert(opts.get.get("customOption") == "customValue")
assert(opts.get.get("startingVersion") == "1")

assert(range.endingVersion().get() == "5")
}

test("user-defined options are forwarded to loadChangelog") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test covers the DataFrame batch path, but the SQL WITH-clause path (AstBuilder) and the streaming paths (DataStreamReader, streaming SQL) each independently construct the UnresolvedRelation.options in their own frontend before RelationResolution dispatches. Would you mind adding smoke tests for at least the SQL and streaming DataFrame paths so a future regression in any frontend is caught here? (The end-to-end suite touches lastChangelogContext for those paths but not lastOptions.)

Comment on lines 195 to +212
private def evaluateRequirements(
changelog: Changelog,
options: ChangelogInfo): PostProcessingRequirements = {
options: ChangelogContext): PostProcessingRequirements = {
val requiresCarryOverRemoval =
options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE &&
options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE &&
changelog.containsCarryoverRows()
val requiresUpdateDetection =
options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
val requiresNetChanges =
options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES &&
options.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES &&
changelog.containsIntermediateChanges()

// If carry-overs are surfaced and update detection is enabled without carry-over
// removal, carry-overs would be falsely classified as updates, leading to wrong
// results. Hence we throw.
if (requiresUpdateDetection &&
changelog.containsCarryoverRows() &&
options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) {
options.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that options: CaseInsensitiveStringMap is a real distinct concept in this PR, naming a ChangelogContext value options here (and the doc just above — "user-provided ChangelogContext options") reads awkwardly. Since the PR is already updating every options. access in this method, mind renaming to context to match the rest of the rename?

Suggested change
private def evaluateRequirements(
changelog: Changelog,
options: ChangelogInfo): PostProcessingRequirements = {
options: ChangelogContext): PostProcessingRequirements = {
val requiresCarryOverRemoval =
options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE &&
options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE &&
changelog.containsCarryoverRows()
val requiresUpdateDetection =
options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
val requiresNetChanges =
options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES &&
options.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES &&
changelog.containsIntermediateChanges()
// If carry-overs are surfaced and update detection is enabled without carry-over
// removal, carry-overs would be falsely classified as updates, leading to wrong
// results. Hence we throw.
if (requiresUpdateDetection &&
changelog.containsCarryoverRows() &&
options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) {
options.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) {
private def evaluateRequirements(
changelog: Changelog,
context: ChangelogContext): PostProcessingRequirements = {
val requiresCarryOverRemoval =
context.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE &&
changelog.containsCarryoverRows()
val requiresUpdateDetection =
context.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
val requiresNetChanges =
context.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES &&
changelog.containsIntermediateChanges()
// If carry-overs are surfaced and update detection is enabled without carry-over
// removal, carry-overs would be falsely classified as updates, leading to wrong
// results. Hence we throw.
if (requiresUpdateDetection &&
changelog.containsCarryoverRows() &&
context.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants